package backtype.storm.metric;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;

import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.Utils;

/*
 * Listens for all metrics, dumps them to log
 *
 * To use, add this to your topology's configuration:
 *   conf.registerMetricsConsumer(backtype.storm.metrics.LoggingMetricsConsumer.class, 1);
 *
 * Or edit the storm.yaml config file:
 *
 *   topology.metrics.consumer.register:
 *     - class: "backtype.storm.metrics.LoggingMetricsConsumer"
 *       parallelism.hint: 1
 *
 */
public class LoggingMetricsConsumer implements IMetricsConsumer {
	public static final Logger LOG = LoggerFactory
			.getLogger(LoggingMetricsConsumer.class);

	@Override
	public void prepare(Map stormConf, Object registrationArgument,
			TopologyContext context, IErrorReporter errorReporter) {
	}

	static private String padding = "                       ";

	@Override
	public void handleDataPoints(TaskInfo taskInfo,
			Collection<DataPoint> dataPoints) {
		StringBuilder sb = new StringBuilder();
		String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
				taskInfo.timestamp, taskInfo.srcWorkerHost,
				taskInfo.srcWorkerPort, taskInfo.srcTaskId,
				taskInfo.srcComponentId);
		sb.append(header);
		for (DataPoint p : dataPoints) {
			sb.delete(header.length(), sb.length());
			sb.append(p.name).append(padding)
					.delete(header.length() + 23, sb.length()).append("\t")
					.append(p.value);
			LOG.info(sb.toString());
		}
	}

	@Override
	public void cleanup() {
	}
}
